package com.fnd.servicechat.ws.handler;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.fnd.servicechat.cache.LikeSomeCacheTemplate;
import com.fnd.servicechat.cache.RedisCacheTemplate;
import com.fnd.servicechat.service.IUserService;
import com.fnd.servicechat.ws.messaging.*;
import io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.thymeleaf.util.StringUtils;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @Description 消息处理类
 * @Author Administrator
 * @Date 2019/11/15 13:16
 * @Version 1.0
 */
@Component
public class MessageHandler {
    private final Logger logger = LoggerFactory.getLogger(MessageHandler.class);

    /**
     * 该map存储在线Channel的信息key为ChannelId， value为 Channel
     */
    public static Map<String, Channel> group_map= new ConcurrentHashMap<>();
    @Autowired
    private LikeSomeCacheTemplate cacheTemplate;   //异步将信息存储在mongodb中
    @Autowired
    private RedisCacheTemplate redisCacheTemplate;    //存储上线用户的信息 <userId, clientId>  注：每次有新的连接进来时候随机生成clientId，保存在redis缓存中
    @Autowired
    private IUserService userService;
    /**
     * 客户登录操作
     * @param message 服务端接收到的信息
     * @param channel Channel
     */
    public void login(SendMessage message, Channel channel){
        if(userService.isExitByUserId(message.getUserId())){
            //1、获取客户登录userID,根据id随机生成会话id，并存储在redis中
            logger.debug("会话{" + channel.id().asLongText() + "},用户{" + message.getUserId() + "}登录成功");
            save(message.getUserId(), channel.id().asLongText());
            saveChannel(channel.id().asLongText(), channel);
            channel.writeAndFlush(MessageFactory.buildMessage(new ResultMessage<String>(ResultMessage.RESULT_Type_LOGIN_SUCCESS, "会话创建成功")));
            Online_message_push(message,channel);
        }else{
            logger.debug("会话{" + channel.id().asLongText() + "},用户{" + message.getUserId() + "}登录失败");
            channel.writeAndFlush(MessageFactory.buildMessage(new ResultMessage<String>(ResultMessage.RESULT_Type_MESSAGE_SERVICE, "会话创建失败")));
            channel.close();
        }
    }

    /**
     * 处理消息
     * @param message 服务端接收到的信息
     * @param channel Channel
     */
    public void message(SendMessage message, Channel channel){
        logger.debug("消息处理，会话{" + channel.id().asLongText() + "},源类型{" + message.getMessageType() + "} 100-点对点 101-群聊");
        String toClientId;
        Channel toChannel;
        if(SendMessage.MESSAGETYPE_POINT_TO_POINT == message.getMessageType()){
            //点对点发送
            //1.1 获取已登录的用户clientId
            toClientId = getOnLineUserClientId(message.getToUserId());
            toChannel = StringUtils.isEmpty(toClientId) ? null : getChannel(toClientId);
            if(null == toChannel){
                // TODO 后续放在缓存重新发送
                logger.warn("接收用户{" + message.getToUserId() + "}不在线");
                channel.writeAndFlush(MessageFactory.buildMessage(new ResultMessage<String>(ResultMessage.RESULT_Type_MESSAGE_SERVICE, "对方不在线")));
                offline_message_save(message.getToUserId(),message);
                return;
            }
            toChannel.writeAndFlush(MessageFactory.buildMessage(new ResultMessage(ResultMessage.RESULT_Type_MESSAGE_SENDING, message.getUserId() ,message.getData())));
        }else if(SendMessage.MESSAGETYPE_POINT_TO_GROUP == message.getMessageType()){
            //群聊
            //1.获取所在群的groud_id,通过group_id查询群组所有用户的user_id
            List<Integer> userIds = userService.getAllUserIdsByGroudId(message.getGroupId());
            if(null == userIds){
                logger.warn("群组会话{" + channel.id().asLongText() + "}获取群组信息失败");
                channel.writeAndFlush(MessageFactory.buildMessage(new ResultMessage<String>(ResultMessage.RESULT_Type_MESSAGE_SERVICE, "获取群组信息失败")));
                return;
            }
            //2.将在线的用户发送消息
            ResultMessage result = new ResultMessage(ResultMessage.RESULT_TYPE_MESSAGE_GROUPSENDING, message.getUserId() ,message.getData());
            result.setGroupId(message.getGroupId());
            for(Integer userId : userIds) {
                if(userId != message.getUserId()){
                    toClientId = getOnLineUserClientId(userId);
                    toChannel = StringUtils.isEmpty(toClientId) ? null : getChannel(toClientId);
                    if(null == toChannel){
                        // TODO 后续放在缓存重新发送
                        logger.warn("群组-接收用户{" + userId + "}不在线,无法进行发送");
                        offline_message_save(userId,message);
                        continue;

                    }
                    //类似点对点聊天
                    result.setFromUserId(message.getUserId());
                    toChannel.writeAndFlush(MessageFactory.buildMessage(result));
                }
            }
        }else{
            logger.warn("会话{" + channel.id().asLongText() + ",非法消息源类型");
            removeChannel(channel.id().asLongText());
        }
    }

    /**
     * 上传文件处理消息
     * @param message 服务端接收到的信息
     * @param channel Channel
     */
    public void fileMessage(SendMessage message, Channel channel){
        logger.debug("上传文件处理消息，会话{" + channel.id().asLongText() + "},源类型{" + message.getMessageType() + "} 100-点对点 101-群聊");
        String toClientId;
        Channel toChannel;
        if(SendMessage.MESSAGETYPE_POINT_TO_POINT == message.getMessageType()){
            //点对点发送
            //1.1 获取已登录的用户clientId
            toClientId = getOnLineUserClientId(message.getToUserId());
            toChannel = StringUtils.isEmpty(toClientId) ? null : getChannel(toClientId);
            if(null == toChannel){
                // TODO 后续放在缓存重新发送
                logger.warn("接收用户{" + message.getToUserId() + "}不在线");
                channel.writeAndFlush(MessageFactory.buildMessage(new ResultMessage<String>(ResultMessage.RESULT_Type_MESSAGE_SERVICE, "对方不在线")));
                offline_message_save(message.getToUserId(),message);    // 将不在线用的消息缓存在redis里面
                return;
            }
            ResultMessage<MFile> result = new ResultMessage(ResultMessage.RESULT_TYPE_FILEMESSAGE_SENDING, message.getUserId() ,message.getData());
            result.setData(JSON.toJavaObject(JSONObject.parseObject(message.getData().toString()), MFile.class));
            toChannel.writeAndFlush(MessageFactory.buildMessage(result));
        }else if(SendMessage.MESSAGETYPE_POINT_TO_GROUP == message.getMessageType()){
            //群聊
            //1.获取所在群的groud_id,通过group_id查询群组所有用户的user_id
            List<Integer> userIds = userService.getAllUserIdsByGroudId(message.getGroupId());
            if(null == userIds){
                logger.warn("群组会话{" + channel.id().asLongText() + "}获取群组信息失败");
                channel.writeAndFlush(MessageFactory.buildMessage(new ResultMessage<String>(ResultMessage.RESULT_Type_MESSAGE_SERVICE, "获取群组信息失败")));
                return;
            }
            //2.将在线的用户发送消息
            ResultMessage result = new ResultMessage(ResultMessage.RESULT_TYPE_FILEMESSAGE_GROUPING, message.getUserId() ,message.getData());
            result.setGroupId(message.getGroupId());
            result.setData(JSON.toJavaObject(JSONObject.parseObject(message.getData().toString()), MFile.class));
            for(Integer userId : userIds) {
                if(userId != message.getUserId()){
                    toClientId = getOnLineUserClientId(userId);
                    toChannel = StringUtils.isEmpty(toClientId) ? null : getChannel(toClientId);
                    if(null == toChannel){
                        // TODO 后续放在缓存重新发送
                        logger.warn("群组-接收用户{" + userId + "}不在线,无法进行发送");
                        offline_message_save(userId,message);
                        continue;
                    }
                    //类似点对点聊天
                    result.setFromUserId(message.getUserId());
                    toChannel.writeAndFlush(MessageFactory.buildMessage(result));
                }
            }
        }else{
            logger.warn("会话{" + channel.id().asLongText() + ",非法消息源类型");
            removeChannel(channel.id().asLongText());
        }
    }


    /**
     * 保存用户在线状态
     * @param key 用户ID
     * @param value 用户会话clientId
     */
    public void save(Integer key, Object value){
        redisCacheTemplate.set(key.toString(), value);
    }

    /**
     * 删除该用户在线状态
     * @param userId
     */
    public void quit(Integer userId){
        redisCacheTemplate.del(userId.toString());
    }

    /**
     * 获取在线用户的会话clientId
     * @param userId 用户ID
     * @return
     */
    public String getOnLineUserClientId(Integer userId){
        return (String) redisCacheTemplate.get(userId.toString());
    }

    /**
     * 保存会话
     * @param clientId  Channel会话id
     * @param channel  对应的Channel
     */
    public void saveChannel(String clientId, Channel channel){
        group_map.put(clientId, channel);
    }

    /**
     * 移除会话
     * @param clientId Channel会话id
     * @return
     */
    public Channel removeChannel(String clientId){
        return group_map.remove(clientId);
    }

    /**
     * 获取会话
     * @param clientId
     * @return
     */
    public Channel getChannel(String clientId){
        return group_map.get(clientId);
    }

    /**
     * 离线消息缓存
     */
    public void offline_message_save(Integer key, Object value){
        redisCacheTemplate.lSet("offline_"+key,value,60 * 60 * 24);
    }

    /**
     * 用户上线后消息推送
     */
    public void Online_message_push(SendMessage message,Channel channel){
        List<Object> list = redisCacheTemplate.lGet("offline_"+message.getUserId(),0,-1);
        if(list.size()>0){
            for (Object s: list) {
                if (s instanceof SendMessage) {
                    SendMessage messages = (SendMessage)s;   // 强转
                    switch (messages.getType()){
                        case SendMessage.TYPE_SAY:
                            // 判断群聊
                            if(SendMessage.MESSAGETYPE_POINT_TO_GROUP == messages.getMessageType()){
                                group_Online_msg(messages,channel);
                            }else {
                                message(messages,channel);
                            }
                            break;
                        case SendMessage.TYPE_FILE:
                            if(SendMessage.MESSAGETYPE_POINT_TO_GROUP == messages.getMessageType()){
                                group_Online_msg(messages,channel);
                            }else {
                                fileMessage(messages,channel);
                            }
                            break;
                    }
                }
            }
            redisCacheTemplate.del("offline_"+message.getUserId());
        }
    }

    /**
     * 群聊用户接收离线消息
     * @param message
     * @param channel
     */
    public void group_Online_msg(SendMessage message,Channel channel){
        ResultMessage result = new ResultMessage(ResultMessage.RESULT_TYPE_MESSAGE_GROUPSENDING, message.getUserId() ,message.getData());
        // 判断是不是文件类型
        if(message.getType() == SendMessage.TYPE_FILE){
            result = new ResultMessage(ResultMessage.RESULT_TYPE_FILEMESSAGE_GROUPING, message.getUserId() ,message.getData());
            result.setData(JSON.toJavaObject(JSONObject.parseObject(message.getData().toString()), MFile.class));
        }
        result.setGroupId(message.getGroupId());
        //类似点对点聊天
        result.setFromUserId(message.getUserId());
        channel.writeAndFlush(MessageFactory.buildMessage(result));
    }
}
